package com.zhang.hadoop.flink.test3;

import com.zhang.hadoop.flink.base.Event;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;
import java.util.List;

/**
 * @author: zhang yufei
 * @createTime:2022/5/21 11:56
 * @description:
 */
public class TransformReduceTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        List<Event> events = new ArrayList<>();
        events.add(new Event("huichao", "./yindao", 1000L));
        events.add(new Event("yanghui", "./yindao", 1000L));
        events.add(new Event("yanghui", "./gangmen", 2000L));
        events.add(new Event("yanghui", "./siwajiao", 3000L));
        events.add(new Event("yuping", "./siwajiao", 2000L));
        events.add(new Event("yuping", "./yindao", 4000L));
        events.add(new Event("yangdan", "./gangmen", 2000L));
        events.add(new Event("yangdan", "./yindao", 6000L));
        events.add(new Event("jingru", "./niaodao", 2000L));
        DataStreamSource<Event> stream = env.fromCollection(events);

        SingleOutputStreamOperator<Tuple2<String, Long>> clicksByUser = stream.map(new MapFunction<Event, Tuple2<String, Long>>() {
            @Override
            public Tuple2 map(Event event) throws Exception {
                return Tuple2.of(event.user, 1L);
            }
        }).keyBy(data -> data.f0)
                .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                        return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                    }
                });
        //2.选取当前最活跃用户
        SingleOutputStreamOperator<Tuple2<String, Long>> result = clicksByUser.keyBy(data -> "key")
                .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                        return value1.f1 > value2.f1 ? value1 : value2;
                    }
                });
        result.print();
        env.execute();
    }
}
